/*
 * Copyright (2022) Bytedance Ltd. and/or its affiliates
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <Analyzers/QualifiedName.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>

#include <vector>
#include <list>
#include <memory>

namespace DB
{

struct FieldDescription;
using FieldDescriptions = std::vector<FieldDescription>;
struct Scope;
using ScopePtr = const Scope *;

namespace ErrorCodes
{
    extern const int AMBIGUOUS_IDENTIFIER;
}

struct FieldDescription
{
    String name;
    DataTypePtr type;

    /// ANSI SQL specific.
    QualifiedName prefix;

    struct OriginColumn
    {
        StoragePtr storage;
        StorageMetadataPtr metadata_snapshot;
        IAST * table_ast;
        String column;
        size_t index_of_scope;
    };

    using OriginColumns = std::vector<OriginColumn>;

    /// track where this field comes from
    OriginColumns origin_columns;

    /// ANSI SQL specific.
    /// Materialized columns, alias columns and virtual columns are not substituted by asterisks,
    /// but derived columns should always be substituted. Only valid for column fields.
    /// s.a. https://clickhouse.com/docs/zh/sql-reference/statements/create/table/#materialized
    bool substituted_by_asterisk;

    bool can_be_array_joined;

    // constructor for computed columns
    FieldDescription(String name_, DataTypePtr type_, QualifiedName prefix_ = {})
        : FieldDescription(std::move(name_), std::move(type_), std::move(prefix_), {}, true, true)
    {
    }

    // full costructor
    FieldDescription(
        String name_,
        DataTypePtr type_,
        QualifiedName prefix_,
        OriginColumns origin_columns_,
        bool substituted_by_asterisk_,
        bool can_be_array_joined_)
        : name(std::move(name_))
        , type(std::move(type_))
        , prefix(std::move(prefix_))
        , origin_columns(std::move(origin_columns_))
        , substituted_by_asterisk(substituted_by_asterisk_)
        , can_be_array_joined(can_be_array_joined_)
    {
    }

    // constructor for table columns
    FieldDescription(
        String name_,
        DataTypePtr type_,
        QualifiedName prefix_,
        StoragePtr origin_table_,
        StorageMetadataPtr origin_metadata_snapshot_,
        IAST * origin_table_ast_,
        String origin_column_,
        size_t index_of_origin_scope_,
        bool substituted_by_asterisk_,
        bool can_be_array_joined_)
        : name(std::move(name_))
        , type(std::move(type_))
        , prefix(std::move(prefix_))
        , origin_columns{OriginColumn{
              std::move(origin_table_),
              std::move(origin_metadata_snapshot_),
              origin_table_ast_,
              std::move(origin_column_),
              index_of_origin_scope_}}
        , substituted_by_asterisk(substituted_by_asterisk_)
        , can_be_array_joined(can_be_array_joined_)
    {
    }

    FieldDescription withNewName(const String & new_name) const;
    FieldDescription withNewPrefix(const QualifiedName & new_prefix) const;
    bool hasOriginInfo() const { return !origin_columns.empty(); }
    void copyOriginInfo(const FieldDescription & source_field);
    String getOriginColumnName() const
    {
        assert(origin_columns.size() == 1);
        return origin_columns.front().column;
    }

    // ANSI sql
    bool matchName(const QualifiedName & target_name) const;

    // Clickhouse SQL
    bool matchName(const String & target_name) const;
};

struct ResolvedField
{
    ScopePtr scope;
    size_t local_index;
    size_t hierarchy_index;

    ResolvedField(ScopePtr scope_, size_t local_index_);

    ResolvedField() = default;

    const FieldDescription & getFieldDescription() const;
};

/// Scope defines which names can be seen in current clause, thus can be used to analyze expressions.
///
/// Parent scope:
///   Scope is a hierarchical structure. If a name can not be resolved in current scope, it will delegate
///   to its parent to resolve, until to the root scope which has no parent.
///
/// Scope Type:
///   A RELATION scope is generated by a table scan/join...
///   A LAMBDA scope is generated when we enter a lambda function.
///
/// Scopes of a ASTSelectQuery:
///     Source scope: is generated by the FROM clause, used to analyze WHERE/GROUP BY/HAVING/SELECT.
///
/// Outer query scope:
///   Within a subquery expression(IN Subquery, scalar subquery, EXISTS subquery), all scopes have the outer
///   query's scope as their parent(directly or indirectly). Thus we can analyze co-related subqueries.
struct Scope
{
    enum class ScopeType
    {
        RELATION,
        LAMBDA
    };

    Scope() = delete;
    Scope(ScopeType type_, ScopePtr parent_, bool query_boundary_, FieldDescriptions field_description_)
        : type(type_), parent(parent_), query_boundary(query_boundary_), field_descriptions(std::move(field_description_))
    {
        if (!query_boundary_ && !parent)
            throw Exception("A non-query-boundary scope must have a parent scope.", ErrorCodes::LOGICAL_ERROR);
    }
private:
    ScopeType type;
    ScopePtr parent = nullptr;
    bool query_boundary;
    FieldDescriptions field_descriptions;
public:
    ScopeType getType() const { return type; }

    const FieldDescriptions & getFields() const { return field_descriptions; }

    /// For ansi SQL, a qualified name will be resolved by below rules:
    ///   1) identifer's last name equal to field's name;
    ///   2) identifer's name prefix is a suffix of field's name prefix.
    ///      (Field: tpcds.store_sales.ss_store_sk, Identifier: store_sales.ss_store_sk)
    /// Ambiguous resolution is not allowed
    std::optional<ResolvedField> resolveFieldByAnsi(const QualifiedName & target_name) const
    {
        return resolveField<QualifiedName, true>(target_name);
    }

    /// For clickhouse SQL, qualified names have been normalized to canonical names in rewriting phase
    /// (see also TranslateQualifiedNamesVisitor). If there are multiple fields with a same canonical
    /// name, only the first fields will be used.
    /// E.g. suppose we have tables: t1 (`s` String, `t2.s` String)
    ///                              t2 (`s` String)
    /// "SELECT t2.s FROM t1, t2" will output the data of column `t2.s` of table t1.
    std::optional<ResolvedField> resolveFieldByClickhouse(const String & target_name) const
    {
        return resolveField<String, false>(target_name);
    }

    const FieldDescription & at(size_t n) const
    {
        return field_descriptions.at(n);
    }

    size_t size() const
    {
        return field_descriptions.size();
    }

    ScopePtr getLocalParent() const; // parent scope of current query, null if `this` is a root scope
    ScopePtr getQueryBoundaryScope() const; // root scope of current query, nonnull
    ScopePtr getOuterQueryScope() const; // first scope of outer query, null if outer query not exists
    bool hasOuterQueryScope(ScopePtr other) const;
    bool isLocalScope(ScopePtr other) const;
    size_t getHierarchyOffset() const;
    size_t getHierarchySize() const;
    Names getOriginColumns() const;
    NameSet getNamesSet() const;
    String toString() const;

private:
    template <typename T, bool check_ambiguous>
    std::optional<ResolvedField> resolveField(const T & target) const
    {
        ScopePtr scope = this;

        while (scope)
        {
            std::vector<size_t> matches;

            for (size_t i = 0; i < scope->size(); ++i)
            {
                if (scope->at(i).matchName(target))
                {
                    if constexpr (check_ambiguous)
                        matches.push_back(i);
                    else
                        return ResolvedField(scope, i);
                }
            }

            if constexpr (check_ambiguous)
            {
                if (matches.size() > 1)
                    throw Exception("Identifier " + target.toString() + " is ambiguous.", ErrorCodes::AMBIGUOUS_IDENTIFIER);
                else if (matches.size() == 1)
                    return ResolvedField(scope, matches.front());
            }

            scope = scope->parent;
        }

        return std::nullopt;
    }
};

struct ScopeFactory
{
    std::list<Scope> scopes;

    ScopePtr createScope(Scope::ScopeType type, ScopePtr parent, bool query_boundary, FieldDescriptions field_descriptions);
    ScopePtr createLambdaScope(ScopePtr parent, FieldDescriptions field_descriptions);
};

}
